package io.sundial.executor.impl;

import io.sundial.core.AtomicRTCommand;
import io.sundial.core.lifecycle.exception.DestroyingException;
import io.sundial.executor.event.*;
import io.sundial.executor.exception.ExecutingException;
import io.sundial.job.*;

import java.util.Comparator;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 抽象的作业被动式执行器
 *
 * @author Payne 646742615@qq.com
 * 2018/12/20 21:24
 */
public abstract class InvokingExecutor extends ReplyingExecutor {
    private final BlockingQueue<Runnable> workingQueue = new PriorityBlockingQueue<>(11, new JobPriorityComparator());
    private final ThreadFactory threadFactory = new JobThreadFactory();
    private final RejectedExecutionHandler rejectedExecutionHandler = new JobRejectedExecutionHandler();
    private final Thread.UncaughtExceptionHandler uncaughtExceptionHandler = new JobUncaughtExceptionHandler();
    private final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            Runtime.getRuntime().availableProcessors(),
            Integer.MAX_VALUE,
            60L * 1000L,
            TimeUnit.MILLISECONDS,
            workingQueue,
            threadFactory,
            rejectedExecutionHandler
    );
    private final JobCancellation jobCancellation = new JobCommandCancellation();
    private final ConcurrentMap<JobCommand, JobExecution> jobMapping = new ConcurrentHashMap<>();

    private volatile String threadName = "sundial-executor-thread";
    private volatile Boolean threadDaemon = false;
    private volatile Integer threadPriority = Thread.NORM_PRIORITY;
    private volatile Comparator<JobCommand> jobPriorityComparator;

    @Override
    protected void destroying() throws DestroyingException {
        super.destroying();

        threadPoolExecutor.shutdown();
    }

    private void fire(JobStatus jobStatus, JobCommand jobCommand, JobResult jobResult, JobException jobException) {
        switch (jobStatus) {
            case SUBMITTED:
                fire(new JobSubmittedEvent(jobCommand));
                break;
            case REJECTED:
                fire(new JobRejectedEvent(jobCommand));
                break;
            case CANCELED:
                fire(new JobCanceledEvent(jobCommand));
                break;
            case EXECUTING:
                fire(new JobExecutingEvent(jobCommand));
                break;
            case COMPLETED:
                fire(new JobCompletedEvent(jobCommand, jobResult));
                break;
            case FAILED:
                fire(new JobFailedEvent(jobCommand, jobException));
                break;
        }
    }

    @Override
    public JobExecution execute(final JobCommand jobCommand) throws ExecutingException {
        return doReadingCommand(new AtomicRTCommand<JobExecution, ExecutingException>() {
            @Override
            public JobExecution run() throws ExecutingException {
                JobCommandCallable callable = new JobCommandCallable(jobCommand);
                JobExecution jobExecution = new JobExecution(callable, jobCommand, jobCancellation);
                JobStatus jobStatus = null;
                JobException jobException = null;
                try {
                    threadPoolExecutor.execute(jobExecution);

                    jobMapping.put(jobCommand, jobExecution);

                    jobStatus = JobStatus.SUBMITTED;
                    jobExecution.setJobStatus(jobStatus);
                    return jobExecution;
                } catch (RejectedExecutionException e) {
                    jobStatus = JobStatus.REJECTED;
                    jobException = new JobException(e);

                    jobMapping.remove(jobCommand);

                    jobExecution.setJobStatus(jobStatus);
                    jobExecution.setJobException(jobException);
                    logger.warn("job command has been rejected", e);
                    return jobExecution;
                } catch (Throwable e) {
                    jobStatus = JobStatus.FAILED;
                    jobException = new JobException(e);

                    jobMapping.remove(jobCommand);

                    jobExecution.setJobStatus(jobStatus);
                    jobExecution.setJobException(jobException);
                    logger.error("error occurred while submitting job command", e);
                    throw new ExecutingException(e);
                } finally {
                    fire(jobStatus, jobCommand, null, jobException);
                }
            }
        });
    }

    @Override
    public boolean cancel(JobCommand jobCommand, boolean mayInterruptIfRunning) {
        JobExecution jobExecution = jobMapping.get(jobCommand);
        return jobExecution != null && jobExecution.cancel(mayInterruptIfRunning);
    }

    private class JobCommandCallable implements Callable<JobResult> {
        private final JobCommand jobCommand;

        JobCommandCallable(JobCommand jobCommand) {
            this.jobCommand = jobCommand;
        }

        @Override
        public JobResult call() throws Exception {
            return doReadingCommand(new AtomicRTCommand<JobResult, Exception>() {
                @Override
                public JobResult run() throws Exception {
                    JobStatus jobStatus = null;
                    JobResult jobResult = null;
                    JobException jobException = null;
                    try {
                        jobStatus = JobStatus.EXECUTING;
                        fire(jobStatus, jobCommand, null, null);
                    } catch (Exception e) {
                        logger.error("error occurred while replying job state", e);
                    }
                    try {
                        JobSharding jobSharding = jobCommand.getJobSharding();
                        String jobName = jobSharding.getJobName();
                        String jobGroup = jobSharding.getJobGroup();
                        Job job = discover(jobName, jobGroup);
                        JobContext jobContext = new JobContext(
                                jobCommand.getJobTime(),
                                jobSharding.getIndex(),
                                jobSharding.getTotal(),
                                jobSharding.getValue(),
                                jobCommand.getJobArguments(),
                                context
                        );
                        jobResult = job.run(jobContext);
                        jobStatus = JobStatus.COMPLETED;
                        return jobResult;
                    } catch (JobException e) {
                        logger.error("error occurred while running job command", e);
                        jobException = e;
                        jobStatus = JobStatus.FAILED;
                        throw e;
                    } catch (Throwable e) {
                        logger.error("error occurred while running job command", e);
                        jobException = new JobException(e);
                        jobStatus = JobStatus.FAILED;
                        throw e;
                    } finally {
                        jobMapping.remove(jobCommand);
                        fire(jobStatus, jobCommand, jobResult, jobException);
                    }
                }
            });
        }
    }

    private class JobCommandCancellation implements JobCancellation {

        @Override
        public void onCanceled(JobCommand jobCommand) {
            jobMapping.remove(jobCommand);
            fire(JobStatus.CANCELED, jobCommand, null, null);
        }
    }

    private class JobThreadFactory implements ThreadFactory {
        private final AtomicInteger count = new AtomicInteger();

        @Override
        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(runnable);
            int index = count.getAndIncrement();
            String threadName = InvokingExecutor.this.threadName;
            if (threadName != null) {
                thread.setName(threadName + "[" + index + "]");
            }
            Boolean threadDaemon = InvokingExecutor.this.threadDaemon;
            if (threadDaemon != null) {
                thread.setDaemon(threadDaemon);
            }
            Integer threadPriority = InvokingExecutor.this.threadPriority;
            if (threadPriority != null && threadPriority >= Thread.MIN_PRIORITY && threadPriority <= Thread.MAX_PRIORITY) {
                thread.setPriority(threadPriority);
            }
            thread.setUncaughtExceptionHandler(uncaughtExceptionHandler);
            return thread;
        }
    }

    private class JobRejectedExecutionHandler implements RejectedExecutionHandler {

        @Override
        public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
            throw new RejectedExecutionException();
        }
    }

    private class JobUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {

        @Override
        public void uncaughtException(Thread thread, Throwable throwable) {

        }
    }

    private class JobPriorityComparator implements Comparator<Runnable> {

        @Override
        public int compare(Runnable workA, Runnable workB) {
            final Comparator<JobCommand> comparator = jobPriorityComparator;
            final JobCommand jobA = ((JobExecution) workA).getJobCommand();
            final JobCommand jobB = ((JobExecution) workB).getJobCommand();
            return comparator != null ? comparator.compare(jobA, jobB) : jobA.compareTo(jobB);
        }
    }

    public int getCorePoolSize() {
        return threadPoolExecutor.getCorePoolSize();
    }

    public void setCorePoolSize(int corePoolSize) {
        threadPoolExecutor.setCorePoolSize(corePoolSize);
    }

    public int getMaximumPoolSize() {
        return threadPoolExecutor.getMaximumPoolSize();
    }

    public void setMaximumPoolSize(int maximumPoolSize) {
        threadPoolExecutor.setMaximumPoolSize(maximumPoolSize);
    }

    public long getKeepAliveTime() {
        return threadPoolExecutor.getKeepAliveTime(TimeUnit.MILLISECONDS);
    }

    public void setKeepAliveTime(long keepAliveTime) {
        threadPoolExecutor.setKeepAliveTime(keepAliveTime, TimeUnit.MILLISECONDS);
    }

    public String getThreadName() {
        return threadName;
    }

    public void setThreadName(String threadName) {
        this.threadName = threadName;
    }

    public Boolean getThreadDaemon() {
        return threadDaemon;
    }

    public void setThreadDaemon(Boolean threadDaemon) {
        this.threadDaemon = threadDaemon;
    }

    public Integer getThreadPriority() {
        return threadPriority;
    }

    public void setThreadPriority(Integer threadPriority) {
        this.threadPriority = threadPriority;
    }

    public Comparator<JobCommand> getJobPriorityComparator() {
        return jobPriorityComparator;
    }

    public void setJobPriorityComparator(Comparator<JobCommand> jobPriorityComparator) {
        this.jobPriorityComparator = jobPriorityComparator;
    }
}
